In [ ]:
cloudantHost='dtaieb.cloudant.com'
cloudantUserName='weenesserliffircedinvers'
cloudantPassword='72a5c4f939a9e2578698029d2bb041d775d088b5'
In [ ]:
airports = sqlContext.read.format("com.cloudant.spark").option("cloudant.host",cloudantHost)\
.option("cloudant.username",cloudantUserName).option("cloudant.password",cloudantPassword)\
.option("schemaSampleSize", "-1").load("flight-metadata")
airports.cache()
airports.registerTempTable("airports")
In [ ]:
import pixiedust
# Display the airports data
display(airports)
In [ ]:
flights = sqlContext.read.format("com.cloudant.spark").option("cloudant.host",cloudantHost)\
.option("cloudant.username",cloudantUserName).option("cloudant.password",cloudantPassword)\
.option("schemaSampleSize", "-1").load("pycon_flightpredict_training_set")
flights.cache()
flights.registerTempTable("training")
In [ ]:
# Display the flights data
display(flights)
In [ ]:
from pyspark.sql import functions as f
from pyspark.sql.types import *
rdd = flights.rdd.flatMap(lambda s: [s.arrivalAirportFsCode, s.departureAirportFsCode]).distinct()\
.map(lambda row:[row])
vertices = airports.join(
sqlContext.createDataFrame(rdd, StructType([StructField("fs",StringType())])), "fs"
).dropDuplicates(["fs"]).withColumnRenamed("fs","id")
print(vertices.count())
In [ ]:
edges = flights.withColumnRenamed("arrivalAirportFsCode","dst")\
.withColumnRenamed("departureAirportFsCode","src")\
.drop("departureWeather").drop("arrivalWeather").drop("pt_type").drop("_id").drop("_rev")
print(edges.count())
The GraphFrames package to install depends on the environment.
Spark 1.6
graphframes:graphframes:0.5.0-spark1.6-s_2.11
Spark 2.x
graphframes:graphframes:0.5.0-spark2.1-s_2.11
In addition, recent versions of graphframes have dependencies on other packages which will need to also be installed:
com.typesafe.scala-logging:scala-logging-api_2.11:2.1.2
com.typesafe.scala-logging:scala-logging-slf4j_2.11:2.1.2
Note: After installing packages, the kernel will need to be restarted and all the previous cells re-run (including the install package cell).
In [ ]:
import pixiedust
if sc.version.startswith('1.6.'): # Spark 1.6
pixiedust.installPackage("graphframes:graphframes:0.5.0-spark1.6-s_2.11")
elif sc.version.startswith('2.'): # Spark 2.1, 2.0
pixiedust.installPackage("graphframes:graphframes:0.5.0-spark2.1-s_2.11")
pixiedust.installPackage("com.typesafe.scala-logging:scala-logging-api_2.11:2.1.2")
pixiedust.installPackage("com.typesafe.scala-logging:scala-logging-slf4j_2.11:2.1.2")
print("done")
In [ ]:
from graphframes import GraphFrame
g = GraphFrame(vertices, edges)
display(g)
The degree of a vertex is the number of edges incident to the vertex. In a directed graph, in-degree is the number of edges where vertex is the destination and out-degree is the number of edges where the vertex is the source. With GraphFrames, there is a degrees, outDegrees and inDegrees property that return a DataFrame containing the id of the vertext and the number of edges. We then sort then in descending order
In [ ]:
from pyspark.sql.functions import *
degrees = g.degrees.sort(desc("degree"))
display( degrees )
For this we use the shortestPaths
api that returns DataFrame containing the properties for each vertex plus an extra column called distances that contains the number of hops to each landmark.
In the following code, we use BOS and LAX as the landmarks
In [ ]:
r = g.shortestPaths(landmarks=["BOS", "LAX"]).select("id", "distances")
display(r)
PageRank is a famous algorithm used by Google Search to rank vertices in a graph by order of importance. To compute pageRank, we'll use the pageRank
api that returns a new graph in which the vertices have a new pagerank
column representing the pagerank score for the vertex and the edges have a new weight
column representing the edge weight that contributed to the pageRank score. We'll then display the vertice ids and associated pageranks sorted descending:
In [ ]:
from pyspark.sql.functions import *
ranks = g.pageRank(resetProbability=0.20, maxIter=5)
rankedVertices = ranks.vertices.select("id","pagerank").orderBy(desc("pagerank"))
rankedEdges = ranks.edges.select("src", "dst", "weight").orderBy(desc("weight") )
ranks = GraphFrame(rankedVertices, rankedEdges)
display(ranks)
In this section, we want to find all the routes between Boston and San Francisco operated by United Airlines with at most 2 hops. To accomplish this, we use the bfs
(Breath First Search) api that returns a DataFrame containing the shortest path between matching vertices. For clarity will only keep the edge when displaying the results
In [ ]:
paths = g.bfs(fromExpr="id='BOS'",toExpr="id = 'SFO'",edgeFilter="carrierFsCode='UA'", maxPathLength = 2)\
.drop("from").drop("to")
paths.cache()
display(paths)
In this section, we'll use a very powerful graphFrames search feature that uses a pattern called motif to find nodes. The pattern we'll use the following pattern "(a)-[]->(b);(b)-[]->(c);!(a)-[]->(c)"
which searches for all nodes a, b and c that have a path to (a,b) and a path to (b,c) but not a path to (a,c).
Also, because the search is computationally expensive, we reduce the number of edges by grouping the flights that have the same src and dst.
In [ ]:
from pyspark.sql.functions import *
h = GraphFrame(g.vertices, g.edges.select("src","dst")\
.groupBy("src","dst").agg(count("src").alias("count")))
query = h.find("(a)-[]->(b);(b)-[]->(c);!(a)-[]->(c)").drop("b")
query.cache()
display(query)
Strongly Connected Components are components for which each vertex is reachable from every other vertex. To compute them, we'll use the stronglyConnectedComponents
api that returns a DataFrame containing all the vertices with the addition of a component
column that has the component id in which the vertex belongs to. We then group all the rows by components and aggregate the sum of all the member vertices. This gives us a good idea of the components distribution in the graph
In [ ]:
from pyspark.sql.functions import *
components = g.stronglyConnectedComponents(maxIter=10).select("id","component")\
.groupBy("component").agg(count("id").alias("count")).orderBy(desc("count"))
display(components)
Label Propagation algorithm is a popular algorithm for finding communities within a graph. It has the advantage to be computationally inexpensive and thus works well with large graphs. To compute the communities, we'll use the labelPropagation
api that returns a DataFrame containing all the vertices with the addition of a label
column that has the label id for the communities in which the vertex belongs to. Similar to the strongly connected components, we'll then group all the rows by label and aggregate the sum of all the member vertices.
In [ ]:
from pyspark.sql.functions import *
communities = g.labelPropagation(maxIter=5).select("id", "label")\
.groupBy("label").agg(count("id").alias("count")).orderBy(desc("count"))
display(communities)
AggregateMessages api is not currently available in Python, so we use PixieDust Scala bridge to call out the Scala API Note: Notice that PixieDust is automatically rebinding the python GraphFrame variable g into a scala GraphFrame with same name
In [ ]:
%%scala
import org.graphframes.lib.AggregateMessages
import org.apache.spark.sql.functions.{avg,desc,floor}
// For each airport, average the delays of the departing flights
val msgToSrc = AggregateMessages.edge("deltaDeparture")
val __agg = g.aggregateMessages
.sendToSrc(msgToSrc) // send each flight delay to source
.agg(floor(avg(AggregateMessages.msg)).as("averageDelays")) // average up all delays
.orderBy(desc("averageDelays"))
.limit(10)
__agg.cache()
__agg.show()
In [ ]:
display(__agg)
In [ ]: